Windows IOCP 示例

这个示将接收到的数据原样返回给客户端:

1#include <WinSock2.h>
2#include <WS2tcpip.h>
3#include <MSWSock.h>
4#include <cstdio>
5
6#define BUFFER_SIZE 1024
7
8enum class IocpType
9{
10    ACCEPT,
11    RECV,
12    SEND
13};
14
15struct IocpContext : public WSAOVERLAPPED
16{
17    IocpContext() : 
18        WSAOVERLAPPED{}
19    {
20
21    }
22
23    WSABUF wsaBuf{BUFFER_SIZE, buffer};
24    char buffer[BUFFER_SIZE];
25    SOCKET sock;
26    IocpType type;
27};
28
29// 提交一个异步的 accept 操作
30bool postAccpet(SOCKET server)
31{
32    // 创建上下文
33    IocpContext* ctx = new IocpContext;
34    ctx->type = IocpType::ACCEPT;
35
36    // 创建接收连接的socket
37    ctx->sock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, nullptr, 0, WSA_FLAG_OVERLAPPED);
38    if (ctx->sock == SOCKET_ERROR)
39    {
40        fprintf(stderr, "WSASocketW: %s\n", strerror(WSAGetLastError()));
41        closesocket(ctx->sock);
42        return false;
43    }
44
45    // 加载 AcceptEX 函数
46    LPFN_ACCEPTEX lpfnAcceptEx = NULL;
47    GUID GuidAcceptEx = WSAID_ACCEPTEX;
48    DWORD dwBytes;
49    {
50        int ret = WSAIoctl(server, SIO_GET_EXTENSION_FUNCTION_POINTER,
51                            &GuidAcceptEx, sizeof (GuidAcceptEx), 
52                            &lpfnAcceptEx, sizeof (lpfnAcceptEx), 
53                            &dwBytes, NULL, NULL);
54        if (ret == SOCKET_ERROR)
55        {
56            fprintf(stderr, "WSAIoctl: %s\n", strerror(WSAGetLastError()));
57            closesocket(ctx->sock);
58            return false;
59        }
60    }
61    
62    // 通过 AcceptEx 发起异步的 accept 操作
63    {
64        DWORD addrlen = sizeof(struct sockaddr_in);
65        DWORD recvlen;
66        BOOL ret = lpfnAcceptEx(server, 
67                                ctx->sock, 
68                                ctx->buffer, 
69                                BUFFER_SIZE - 2*(addrlen+16), 
70                                addrlen + 16, 
71                                addrlen + 16, 
72                                &recvlen, 
73                                ctx);
74        if (!ret && WSAGetLastError() != ERROR_IO_PENDING)
75        {
76            fprintf(stderr, "AcceptEx: %s\n", strerror(WSAGetLastError()));
77            closesocket(ctx->sock);
78            return false;
79        }
80    }
81
82    return true;
83}
84
85// 提交一个异步的 RECV 操作
86bool postRecv(SOCKET sock)
87{
88    IocpContext* ctx = new IocpContext;
89    ctx->sock = sock;
90    ctx->type = IocpType::RECV;
91    DWORD nBytes = BUFFER_SIZE;
92    DWORD flags = 0;
93    int ret = WSARecv(sock, &(ctx->wsaBuf), 1, &nBytes, &flags, ctx, nullptr);
94    if (ret == SOCKET_ERROR && WSAGetLastError() != ERROR_IO_PENDING)
95    {
96        fprintf(stderr, "WSARecv: %s\n", strerror(WSAGetLastError()));
97        return false;
98    }
99
100    return true;
101}
102
103// 提交一个异步的 SEND 操作
104bool postSend(SOCKET sock, const char* data, DWORD size)
105{
106    IocpContext* ctx = new IocpContext;
107    ctx->sock = sock;
108    ctx->type = IocpType::SEND;
109    memcpy(ctx->buffer, data, size);
110    DWORD nBytes = size;
111    ctx->wsaBuf.len = size;
112    DWORD flags = 0;
113    int ret = WSASend(sock, &(ctx->wsaBuf), 1, &nBytes, flags, ctx, nullptr);
114    if (ret == SOCKET_ERROR && WSAGetLastError() != ERROR_IO_PENDING)
115    {
116        fprintf(stderr, "WSASend: %s\n", strerror(WSAGetLastError()));
117        return false;
118    }
119
120    return true;
121}
122
123int main()
124{
125    WSAData wsa;
126    if (WSAStartup(0x202, &wsa) != NO_ERROR)
127    {
128        fprintf(stderr, "WSAStartup failed\n");
129        return EXIT_FAILURE;
130    }
131
132    // 创建服务socket
133    SOCKET server = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, nullptr, 0, WSA_FLAG_OVERLAPPED);
134    if (server == INVALID_SOCKET)
135    {
136        fprintf(stderr, "WSASocketW: %s\n", strerror(WSAGetLastError()));
137        WSACleanup();
138        return EXIT_FAILURE;
139    }
140    
141    // 设为非阻塞
142    unsigned long value = 1;
143    if (ioctlsocket(server, FIONBIO, &value) == SOCKET_ERROR)
144    {
145        fprintf(stderr, "ioctlsocket: %s\n", strerror(WSAGetLastError()));
146        closesocket(server);
147        WSACleanup();
148        return EXIT_FAILURE;
149    }
150
151    // 绑定端口
152    struct sockaddr_in address {};
153    address.sin_family = AF_INET;
154    address.sin_port = htons(8080);
155    inet_pton(AF_INET, "127.0.0.1", &address.sin_addr);
156    if (bind(server, (const sockaddr*)(&address), sizeof(address)) == SOCKET_ERROR)
157    {
158        fprintf(stderr, "bind: %s\n", strerror(WSAGetLastError()));
159        closesocket(server);
160        WSACleanup();
161        return EXIT_FAILURE;
162    }
163
164    // 监听
165    if (listen(server, SOMAXCONN) == SOCKET_ERROR)
166    {
167        fprintf(stderr, "listen: %s\n", strerror(WSAGetLastError()));
168        closesocket(server);
169        WSACleanup();
170        return EXIT_FAILURE;
171    }
172
173    // 创建 IOCP handle
174    HANDLE handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0);
175    if (handle == INVALID_HANDLE_VALUE)
176    {
177        fprintf(stderr, "CreateIoCompletionPort: %s\n", strerror(WSAGetLastError()));
178        closesocket(server);
179        WSACleanup();
180        return EXIT_FAILURE;
181    }
182
183    // 将 server 绑定到 IOCP 上
184    if (CreateIoCompletionPort(reinterpret_cast<HANDLE>(server), handle, 0, 0) == nullptr)
185    {
186        fprintf(stderr, "CreateIoCompletionPort: %s\n", strerror(WSAGetLastError()));
187        closesocket(server);
188        WSACleanup();
189        return EXIT_FAILURE;
190    }
191
192    // 提交一个异步的 accept 操作
193    if (postAccpet(server) == false)
194    {
195        closesocket(server);
196        WSACleanup();
197        return EXIT_FAILURE;
198    }
199
200    while (true)
201    {
202        // 等待操作完成
203        IocpContext* ctx = nullptr;
204        DWORD lpNumberOfBytesTransferred = 0;
205        void* lpCompletionKey = nullptr;
206        {
207            BOOL ret = GetQueuedCompletionStatus(
208                        handle,
209                        &lpNumberOfBytesTransferred,
210                        (PULONG_PTR) &lpCompletionKey,
211                        (LPOVERLAPPED *) &ctx,
212                        INFINITE);
213            if (!ret)
214                continue;
215        }
216        
217        // 处理 ACCEPT 操作
218        if (ctx->type == IocpType::ACCEPT)
219        {   
220            // 重新发起一个异步的 accept 操作,接收下一个连接
221            postAccpet(server);
222
223            // 将连接设为非阻塞
224            unsigned long value = 1;
225            if (ioctlsocket(ctx->sock, FIONBIO, &value) == SOCKET_ERROR)
226            {
227                fprintf(stderr, "ioctlsocket: %s\n", strerror(WSAGetLastError()));
228                closesocket(ctx->sock);
229                delete ctx;
230                continue;
231            }
232
233            // 将连接绑定到 IOCP 上
234            if (CreateIoCompletionPort(reinterpret_cast<HANDLE>(ctx->sock), handle, 0, 0) == nullptr)
235            {
236                fprintf(stderr, "CreateIoCompletionPort: %s\n", strerror(WSAGetLastError()));
237                closesocket(ctx->sock);
238                delete ctx;
239                continue;
240            }
241
242            // 发起一个异步的 send 操作,NOTE: AcceptEx 会读取第一帧数据
243            if (lpNumberOfBytesTransferred > 0)
244                postSend(ctx->sock, ctx->buffer, lpNumberOfBytesTransferred);
245
246            // 发起一个异步的 recv 操作,接受后续数据
247            if (postRecv(ctx->sock) == false)
248            {
249                closesocket(ctx->sock);
250                delete ctx;
251                continue;
252            }
253
254            delete ctx;
255            continue;
256        }
257
258        // 处理 RECV 操作
259        if (ctx->type == IocpType::RECV)
260        {
261            // 连接出错或断开
262            if (lpNumberOfBytesTransferred <= 0)
263            {
264                closesocket(ctx->sock);
265                delete ctx;
266                continue;
267            }
268
269            // 发起一个异步的 send 操作
270            postSend(ctx->sock, ctx->buffer, lpNumberOfBytesTransferred);
271
272            // 发起一个异步的 recv 操作,接收后续数据
273            if (postRecv(ctx->sock) == false)
274            {
275                closesocket(ctx->sock);
276                delete ctx;
277                continue;
278            }
279
280            delete ctx;
281            continue;
282        }
283
284        // 处理 SEND 操作
285        if (ctx->type == IocpType::SEND)
286        {
287            delete ctx;
288            continue;
289        }
290    }
291}

使用 Apache HTTP server benchmarking tool 进行测试,结果如下:

1Server Software:
2Server Hostname:        localhost
3Server Port:            8080
4
5Document Path:          /
6Document Length:        0 bytes
7
8Concurrency Level:      10000
9Time taken for tests:   207.340 seconds
10Complete requests:      10000000
11Failed requests:        0
12Non-2xx responses:      10000000
13Keep-Alive requests:    10000000
14Total transferred:      1060000000 bytes
15HTML transferred:       0 bytes
16Requests per second:    48229.98 [#/sec] (mean)
17Time per request:       207.340 [ms] (mean)
18Time per request:       0.021 [ms] (mean, across all concurrent requests)
19Transfer rate:          4992.56 [Kbytes/sec] received
20
21Connection Times (ms)
22              min  mean[+/-sd] median   max
23Connect:        0    0   0.0      0      16
24Processing:   114  205  19.3    204    1214
25Waiting:        0  205  19.3    204    1214
26Total:        114  205  19.3    204    1214
27
28Percentage of the requests served within a certain time (ms)
29  50%    204
30  66%    205
31  75%    206
32  80%    206
33  90%    208
34  95%    210
35  98%    212
36  99%    213
37 100%   1214 (longest request)